package util;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;

public class Kafka_util {
    //  todo 得到kafka_source的方法
    public static KafkaSource<String> getKafkaSource(String topic){
        return KafkaSource.<String>builder()
                .setBootstrapServers("192.168.40.110:9092")
                .setTopics(topic)
                .setGroupId(topic+"_group")
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();
    }
}
